package guide;

import java.util.Random;

import org.zeromq.*;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;

//
// Paranoid Pirate worker
//
public class ppworker
{
    private final static int HEARTBEAT_LIVENESS = 3;     //  3-5 is reasonable
    private final static int HEARTBEAT_INTERVAL = 1000;  //  msecs
    private final static int INTERVAL_INIT      = 1000;  //  Initial reconnect
    private final static int INTERVAL_MAX       = 32000; //  After exponential backoff

    //  Paranoid Pirate Protocol constants
    private final static String PPP_READY     = "\001"; //  Signals worker is ready
    private final static String PPP_HEARTBEAT = "\002"; //  Signals worker heartbeat

    //  Helper function that returns a new configured socket
    //  connected to the Paranoid Pirate queue

    private static Socket worker_socket(ZContext ctx)
    {
        Socket worker = ctx.createSocket(SocketType.DEALER);
        worker.connect("tcp://localhost:5556");

        //  Tell queue we're ready for work
        System.out.println("I: worker ready\n");
        ZFrame frame = new ZFrame(PPP_READY);
        frame.send(worker, 0);

        return worker;
    }

    //  We have a single task, which implements the worker side of the
    //  Paranoid Pirate Protocol (PPP). The interesting parts here are
    //  the heartbeating, which lets the worker detect if the queue has
    //  died, and vice-versa:

    public static void main(String[] args)
    {
        try (ZContext ctx = new ZContext()) {
            Socket worker = worker_socket(ctx);

            Poller poller = ctx.createPoller(1);
            poller.register(worker, Poller.POLLIN);

            //  If liveness hits zero, queue is considered disconnected
            int liveness = HEARTBEAT_LIVENESS;
            int interval = INTERVAL_INIT;

            //  Send out heartbeats at regular intervals
            long heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;

            Random rand = new Random(System.nanoTime());
            int cycles = 0;
            while (true) {
                int rc = poller.poll(HEARTBEAT_INTERVAL);
                if (rc == -1)
                    break; //  Interrupted

                if (poller.pollin(0)) {
                    //  Get message
                    //  - 3-part envelope + content -> request
                    //  - 1-part HEARTBEAT -> heartbeat
                    ZMsg msg = ZMsg.recvMsg(worker);
                    if (msg == null)
                        break; //  Interrupted

                    //  To test the robustness of the queue implementation we
                    //  simulate various typical problems, such as the worker
                    //  crashing, or running very slowly. We do this after a few
                    //  cycles so that the architecture can get up and running
                    //  first:
                    if (msg.size() == 3) {
                        cycles++;
                        if (cycles > 3 && rand.nextInt(5) == 0) {
                            System.out.println("I: simulating a crash\n");
                            msg.destroy();
                            msg = null;
                            break;
                        }
                        else if (cycles > 3 && rand.nextInt(5) == 0) {
                            System.out.println("I: simulating CPU overload\n");
                            try {
                                Thread.sleep(3000);
                            }
                            catch (InterruptedException e) {
                                break;
                            }
                        }
                        System.out.println("I: normal reply\n");
                        msg.send(worker);
                        liveness = HEARTBEAT_LIVENESS;
                        try {
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e) {
                            break;
                        } //  Do some heavy work
                    }
                    else
                        //  When we get a heartbeat message from the queue, it
                        //  means the queue was (recently) alive, so reset our
                        //  liveness indicator:
                        if (msg.size() == 1) {
                            ZFrame frame = msg.getFirst();
                            String frameData = new String(
                                frame.getData(), ZMQ.CHARSET
                            );
                            if (PPP_HEARTBEAT.equals(frameData))
                                liveness = HEARTBEAT_LIVENESS;
                            else {
                                System.out.println("E: invalid message\n");
                                msg.dump(System.out);
                            }
                            msg.destroy();
                        }
                        else {
                            System.out.println("E: invalid message\n");
                            msg.dump(System.out);
                        }
                    interval = INTERVAL_INIT;
                }
                else
                    //  If the queue hasn't sent us heartbeats in a while,
                    //  destroy the socket and reconnect. This is the simplest
                    //  most brutal way of discarding any messages we might have
                    //  sent in the meantime.
                    if (--liveness == 0) {
                        System.out.println(
                            "W: heartbeat failure, can't reach queue\n"
                        );
                        System.out.printf(
                            "W: reconnecting in %sd msec\n", interval
                        );
                        try {
                            Thread.sleep(interval);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        if (interval < INTERVAL_MAX)
                            interval *= 2;
                        ctx.destroySocket(worker);
                        worker = worker_socket(ctx);
                        liveness = HEARTBEAT_LIVENESS;
                    }

                //  Send heartbeat to queue if it's time
                if (System.currentTimeMillis() > heartbeat_at) {
                    long now = System.currentTimeMillis();
                    heartbeat_at = now + HEARTBEAT_INTERVAL;
                    System.out.println("I: worker heartbeat\n");
                    ZFrame frame = new ZFrame(PPP_HEARTBEAT);
                    frame.send(worker, 0);
                }
            }
        }
    }
}
